# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.

import logging
from typing import Optional

from libcommon.constants import DATASET_SEPARATOR, PARQUET_REVISION
from libcommon.dtos import JobInfo, SplitHubFile
from libcommon.exceptions import (
    FileSystemError,
    ParquetResponseEmptyError,
    PreviousStepFormatError,
)
from libcommon.parquet_utils import extract_split_directory_from_parquet_url
from libcommon.simple_cache import get_previous_step_or_raise
from libcommon.storage import StrPath
from libcommon.viewer_utils.parquet_metadata import create_parquet_metadata_dir, create_parquet_metadata_file
from pyarrow.parquet import ParquetFile

from worker.config import AppConfig
from worker.dtos import (
    CompleteJobResult,
    ConfigParquetMetadataResponse,
    ParquetFileMetadataItem,
)
from worker.job_runners.config.config_job_runner import ConfigJobRunner
from worker.utils import hffs_parquet_url, retry_on_arrow_invalid_open_file

try:
    import libviewer as lv  # type: ignore
except ImportError:
    pass


def create_parquet_metadata_file_from_remote_parquet(
    parquet_file_item: SplitHubFile, hf_endpoint: str, hf_token: Optional[str], parquet_metadata_directory: StrPath
) -> ParquetFileMetadataItem:
    split_directory = extract_split_directory_from_parquet_url(parquet_file_item["url"])
    hfh_parquet_file_path = hffs_parquet_url(
        repo_id=parquet_file_item["dataset"],
        config=parquet_file_item["config"],
        split_directory=split_directory,
        filename=parquet_file_item["filename"],
    )
    try:
        f = retry_on_arrow_invalid_open_file(
            file_url=hfh_parquet_file_path, hf_endpoint=hf_endpoint, hf_token=hf_token, revision=PARQUET_REVISION
        )
        parquet_file_metadata = ParquetFile(f).metadata
    except Exception as e:
        raise FileSystemError(f"Could not read the parquet files: {e}") from e
    split = parquet_file_item["url"].split("/")[-2]
    # ^ https://github.com/huggingface/dataset-viewer/issues/2768
    # to support more than 10k parquet files, in which case, instead of "train" for example,
    # the subdirectories are "train-part0", "train-part1", "train-part2", etc.
    parquet_metadata_subpath = create_parquet_metadata_file(
        dataset=parquet_file_item["dataset"],
        config=parquet_file_item["config"],
        split=split,
        parquet_file_metadata=parquet_file_metadata,
        filename=parquet_file_item["filename"],
        parquet_metadata_directory=parquet_metadata_directory,
    )
    f.close()
    return ParquetFileMetadataItem(
        dataset=parquet_file_item["dataset"],
        config=parquet_file_item["config"],
        split=parquet_file_item["split"],
        url=parquet_file_item["url"],
        filename=parquet_file_item["filename"],
        size=parquet_file_item["size"],
        num_rows=parquet_file_metadata.num_rows,
        parquet_metadata_subpath=parquet_metadata_subpath,
    )


def compute_parquet_metadata_response(
    dataset: str,
    config: str,
    hf_endpoint: str,
    hf_token: Optional[str],
    data_store: Optional[str],
    parquet_metadata_directory: StrPath,
) -> ConfigParquetMetadataResponse:
    """
    Get the response of 'config-parquet-metadata' for one specific dataset and config on huggingface.co.
    Store the config's parquet metadata on the disk and return the list of local metadata files.

    Args:
        dataset (`str`):
            A namespace (user or an organization) and a repo name separated
            by a `/`.
        config (`str`):
            A configuration name.
        hf_endpoint (`str`):
            The Hub endpoint (for example: "https://huggingface.co")
        hf_token (`str`, *optional*):
            An authentication token (See https://huggingface.co/settings/token)
        data_store (`str`, *optional*):
            The data store to use to access the parquet files.
        parquet_metadata_directory (`str` or `pathlib.Path`):
            The directory where the parquet metadata files are stored.

    Raises:
        [~`libcommon.simple_cache.CachedArtifactError`]:
            If the previous step gave an error.
        [~`libcommon.exceptions.PreviousStepFormatError`]:
            If the content of the previous step has not the expected format
        [~`libcommon.exceptions.ParquetResponseEmptyError`]:
            If the previous step provided an empty list of parquet files.
        [~`libcommon.exceptions.FileSystemError`]:
            If the HfFileSystem couldn't access the parquet files.

    Returns:
        `ConfigParquetMetadataResponse`: An object with the list of parquet metadata files.
    """
    logging.info(f"compute 'config-parquet-metadata' for {dataset=} {config=}")

    config_parquet_response = get_previous_step_or_raise(kind="config-parquet", dataset=dataset, config=config)
    try:
        parquet_files_content = config_parquet_response["content"]["parquet_files"]
        parquet_file_items: list[SplitHubFile] = [
            parquet_file_item for parquet_file_item in parquet_files_content if parquet_file_item["config"] == config
        ]
        if not parquet_file_items:
            raise ParquetResponseEmptyError("No parquet files found.")
        content = config_parquet_response["content"]
        if "features" in content and isinstance(content["features"], dict):
            features = content["features"]  # config-parquet version<6 didn't have features
        else:
            # (July 23) we can remove this later and raise an error instead (can be None for backward compatibility)
            features = None
        partial = config_parquet_response["content"]["partial"]
    except Exception as e:
        raise PreviousStepFormatError("Previous step did not return the expected content.") from e

    # get the split name from the first parquet url
    first_parquet_file_item = parquet_file_items[0]
    split = first_parquet_file_item["url"].split("/")[-2]

    # create the parquet metadata directory and subpath
    _dir_path, _parquet_metadata_dir_subpath = create_parquet_metadata_dir(
        dataset=dataset,
        config=config,
        split=split,
        parquet_metadata_directory=parquet_metadata_directory,
    )

    # construct the required parquet_files list for libviewer.Dataset
    files = []
    for parquet_file_item in parquet_file_items:
        split = parquet_file_item["url"].split("/")[-2]
        parquet_metadata_dir_subpath = f"{dataset}/{DATASET_SEPARATOR}/{config}/{split}"

        # ^ https://github.com/huggingface/dataset-viewer/issues/2768
        # to support more than 10k parquet files, in which case, instead of "train" for example,
        # the subdirectories are "train-part0", "train-part1", "train-part2", etc.
        files.append(
            {
                "path": f"{parquet_file_item['config']}/{split}/{parquet_file_item['filename']}",
                "size": parquet_file_item["size"],
                "num_rows": None,
                "metadata_path": f"{parquet_metadata_dir_subpath}/{parquet_file_item['filename']}",
            }
        )

    # instantiate libviewer.Dataset and sync index to create parquet metadata files
    viewer = lv.Dataset(
        name=dataset,
        files=files,
        revision="refs/convert/parquet",
        hf_token=hf_token,
        hf_endpoint=hf_endpoint,
        data_store=data_store,
        metadata_store=f"file://{parquet_metadata_directory}",
    )
    result = viewer.sync_index()

    # construct the expected response format with num_rows from the result
    parquet_files_metadata: list[ParquetFileMetadataItem] = [
        {
            "dataset": item["dataset"],
            "config": item["config"],
            "split": item["split"],
            "url": item["url"],
            "filename": item["filename"],
            "size": item["size"],
            "num_rows": res["num_rows"],
            "parquet_metadata_subpath": res["metadata_path"],
        }
        for item, res in zip(parquet_file_items, result)
    ]

    return ConfigParquetMetadataResponse(
        parquet_files_metadata=parquet_files_metadata, features=features, partial=partial
    )


class ConfigParquetMetadataJobRunner(ConfigJobRunner):
    parquet_metadata_directory: StrPath
    data_store: Optional[str]

    @staticmethod
    def get_job_type() -> str:
        return "config-parquet-metadata"

    def __init__(
        self,
        job_info: JobInfo,
        app_config: AppConfig,
        parquet_metadata_directory: StrPath,
        data_store: Optional[str] = None,
    ) -> None:
        super().__init__(
            job_info=job_info,
            app_config=app_config,
        )
        self.data_store = data_store
        self.parquet_metadata_directory = parquet_metadata_directory

    def compute(self) -> CompleteJobResult:
        return CompleteJobResult(
            compute_parquet_metadata_response(
                dataset=self.dataset,
                config=self.config,
                hf_endpoint=self.app_config.common.hf_endpoint,
                hf_token=self.app_config.common.hf_token,
                data_store=self.data_store,
                parquet_metadata_directory=self.parquet_metadata_directory,
            )
        )
